class: right, inverse, title-slide .title[ # Using R on HPC Clusters Part 2 ] .author[ ### George Ostrouchov ] .institute[ ### Oak Ridge National Laboratory ] .date[ ###
August 19, 2022
Background Image: FRONTIER, First Top500 exascale system, announced June 2022
] --- # Get this presentation: `git clone https://github.com/RBigData/R4HPC.git` * Open `R4HPC_Part2.html` in your web browser * `?` toggles help <br> Slack workspace link for this workshop was emailed to you. <br><br><br> <small> *Many thanks to my colleagues and former colleagues who contributed to the software and ideas presented here. See the RBigData Organization on Github: https://github.com/RBigData. Also, many thanks to all R developers of packages used in this presentation.* *This manuscript has been authored by UT-Battelle, LLC under Contract No. DE-AC05-00OR22725 with the U.S. Department of Energy. The United States Government retains and the publisher, by accepting the article for publication, acknowledges that the United States Government retains a non-exclusive, paid-up, irrevocable, world-wide license to publish or reproduce the published form of this manuscript, or allow others to do so, for United States Government purposes. The Department of Energy will provide public access to these results of federally sponsored research in accordance with the DOE Public Access Plan (http://energy.gov/downloads/doe-public-access-plan).*</small> --- # Today's Package Installs ## See `R4HPC/code_4` R script and shell scripts for your machine --- ## Running MPI on a Laptop macOS in a Terminal window: * `brew install openmpi` * `mpirun -np 4 Rscript your_spmd_code.R` Windows * Web Page: https://docs.microsoft.com/en-us/message-passing-interface/microsoft-mpi * Download: https://www.microsoft.com/en-us/download/details.aspx?id=100593 * `pbdMPI` has a Windows binary on CRAN --- ## Revisit `hello_balance.R` in `code_1` ```r suppressMessages(library(pbdMPI)) comm.print(sessionInfo()) ## get node name host = system("hostname", intern = TRUE) mc.function = function(x) { Sys.sleep(1) # replace with your function for mclapply cores here Sys.getpid() # returns process id } ## Compute how many cores per R session are on this node local_ranks_query = "echo $OMPI_COMM_WORLD_LOCAL_SIZE" ranks_on_my_node = as.numeric(system(local_ranks_query, intern = TRUE)) cores_on_my_node = parallel::detectCores() cores_per_R = floor(cores_on_my_node/ranks_on_my_node) cores_total = allreduce(cores_per_R) # adds up over ranks ## Run mclapply on allocated cores to demonstrate fork pids my_pids = parallel::mclapply(1:cores_per_R, mc.function, mc.cores = cores_per_R) my_pids = do.call(paste, my_pids) # combines results from mclapply ## ## Same cores are shared with OpenBLAS (see flexiblas package) ## or for other OpenMP enabled codes outside mclapply. ## If BLAS functions are called inside mclapply, they compete for the ## same cores: avoid or manage appropriately!!! ## Now report what happened and where msg = paste0("Hello World from rank ", comm.rank(), " on host ", host, " with ", cores_per_R, " cores allocated\n", " (", ranks_on_my_node, " R sessions sharing ", cores_on_my_node, " cores on this host node).\n", " pid: ", my_pids, "\n") comm.cat(msg, quiet = TRUE, all.rank = TRUE) comm.cat("Total R sessions:", comm.size(), "Total cores:", cores_total, "\n", quiet = TRUE) comm.cat("\nNotes: cores on node obtained by: detectCores {parallel}\n", " ranks (R sessions) per node: OMPI_COMM_WORLD_LOCAL_SIZE\n", " pid to core map changes frequently during mclapply\n", quiet = TRUE) finalize() ``` --- ## Working with a remote cluster using R <img src="data:image/png;base64,#pics/01-intro/Workflow.jpg" width="3168" height="500" /> --- background-image: url(data:image/png;base64,#pics/01-intro//WorkflowRunning.jpg) background-position: top right background-size: 20% ## Running Distributed on a Cluster <img src="data:image/png;base64,#pics/01-intro/BatchRonCluster.jpg" width="3428" height="500" /> --- ## Section I: Environment and Workflow ## Section II: Parallel Hardware and Software Overview ## Section III: Shared Memory Tools ## <mark>Section IV:</mark> **Distributed Memory Tools** --- background-image: url(data:image/png;base64,#pics/Mangalore/ParallelSoftware/Slide7distributed.jpg) background-position: bottom background-size: 90% # Distributed Memory Tools --- background-image: url(data:image/png;base64,#pics/Mangalore/ParallelSoftware/Slide7mpi.jpg) background-position: bottom background-size: 90% # Message Passing Interface (MPI) --- background-image: url(data:image/png;base64,#pics/Mangalore/ParallelSoftware/Slide7mpi.jpg) background-position: top right background-size: 20% ## Single Program Multiple Data (SPMD) * N instances of the same code cooperate * Each of the N instances has `rank`, {0, . . ., N-1} * The `rank` determines any differences in work * Instances run asynchronously * SPMD parallelization is a generalization of the serial code * Many rank-aware operations are automated * Collective operations are high level and easy to learn * Explicit point-to-point communications are an advanced topic * Multilevel parallelism is possible * Typically no manager, it is all cooperation --- background-image: url(data:image/png;base64,#pics/Mangalore/ParallelSoftware/Slide7mpi.jpg) background-position: top right background-size: 20% # pbdR Project <img src="data:image/png;base64,#pics/01-intro/pbdRlib.jpg" width="800" height="180" style="display: block; margin: auto auto auto 0;" /> * Bridge HPC with high-productivity of R: Expressive for data and modern statistics * Keep syntax identical to R, when possible * Software reuse philosophy: * Don't reinvent the wheel when possible * Introduce HPC standards with R flavor * Use scalable HPC libraries with R convenience * Simplify and use R intelligence where possible ??? Using HPC concepts and libraries * Benefits the R user by knowing standard components of HPC --- background-image: url(data:image/png;base64,#pics/Mangalore/ParallelSoftware/Slide7mpi.jpg) background-position: top right background-size: 20% <img src="data:image/png;base64,#pics/01-intro/pbdRlib.jpg" width="300" height="80" style="display: block; margin: auto auto auto 0;" /> # Package `pbdMPI` * Specializes in SPMD programming for HPC clusters * Manages printing from ranks * Provides chunking options * Provides communicator splits for multilevel parallelism * In situ capability to process data from other MPI codes without copy * A derivation and rethinking of the `Rmpi` package aimed at HPC clusters * Simplified interface with fewer parameters (using R's S4 methods) * Faster for matrix and array data - no serialization ??? * Prefer pbdMPI over Rmpi due to simplification and speed * No serialization for arrays and vectors * Drops spawning a cluster * Because a client-server relationship is more appropriate --- background-image: url(data:image/png;base64,#pics/Mangalore/ParallelSoftware/Slide7mpi.jpg) background-position: top right background-size: 20% ## `pbdMPI`: High-level Collective Communications Each of these operations is performed across a `communicator` of ranks. Simplest one is all ranks but rank arrays can be used for multilevel collectives. * **`reduce()`** Reduces a set of same-size distributed vectors or arrays with an operation (+ is default). Fast because both communication and reduction are parallel and no serialization is needed. * **`allreduce()`** Same as `reduce()` except all ranks in a `comm` get the result * **`gather()`** Gathers a set of distributed objects * **`allgather()`** Same as `gather()` except all ranks in a `comm` get the result * **`bcast()`** Broadcasts an object from one rank to all in its `comm` * **`scatter()`** Broadcasts different pieces of an object from one rank to all in its `comm` * **`barrier()`** Waits on all ranks in a `comm` before proceeding --- background-image: url(data:image/png;base64,#pics/Mangalore/ParallelSoftware/Slide7mpi.jpg) background-position: top right background-size: 20% ## `pbdMPI`: High-level Collective Operations `\(\small \bf A = \sum_{i=1}^nX_i\)` `\(\quad\)` `\(\qquad\)` `\(\qquad\)` **`A = reduce(X)`** `\(\qquad\)` `\(\qquad\)` **`A = allreduce(X)`** `\(\small \bf A = \left[ X_1 | X_2 | \cdots | X_n \right]\)` `\(\qquad\)` **`A = gather(X)`** `\(\qquad\)` `\(\qquad\)` **`A = allgather(X)`** <img src="data:image/png;base64,#pics/01-intro/RHistory3sub.png" width="1268" height="250" style="display: block; margin: auto 0 auto auto;" /> ??? * Powerful: communication and reduction is highly parallel * that's why it beats Spark/MapReduce --- background-image: url(data:image/png;base64,#pics/Mangalore/ParallelSoftware/Slide7mpi.jpg) background-position: top right background-size: 20% ## `pbdMPI`: Functions to Facilitate SPMD Programming * **`comm.chunk()`** splits a number into chunks in various ways and various formats. Tailored for SPMD programming, returning rank-specific results. * **`comm.set.seed()`** sets the seed of a parallel RNG. If diff = FALSE, then all ranks generate the same stream. Otherwise, ranks generate different streams. * **`comm.print()`** and **`comm.cat()`** print by default from rank 0 only, with options to print from any or all ranks. --- background-image: url(data:image/png;base64,#pics/Mangalore/ParallelSoftware/Slide6.png) background-position: bottom background-size: 90% ## Distributed Programming Works in Shared Memory --- background-image: url(data:image/png;base64,#pics/Mangalore/ParallelSoftware/Slide7mpi.jpg) background-position: top right background-size: 20% ## Hands on Session 5: Hello MPI Ranks `code_5/hello_world.R` ```r suppressMessages(library(pbdMPI)) my_rank = comm.rank() nranks = comm.size() msg = paste0("Hello World! My name is Rank", my_rank, ". We are ", nranks, " identical siblings.") cat(msg, "\n") finalize() ``` ### **Rank** distinguishes the parallel copies of the same code --- ### Hands-on Session 5 - `R4HPC/code_2/rf_serial.R` ```r suppressMessages(library(randomForest)) data(LetterRecognition, package = "mlbench") set.seed(seed = 123) n = nrow(LetterRecognition) n_test = floor(0.2 * n) i_test = sample.int(n, n_test) train = LetterRecognition[-i_test, ] test = LetterRecognition[i_test, ] rf.all = randomForest(lettr ~ ., train, ntree = 500, norm.votes = FALSE) pred = predict(rf.all, test) correct = sum(pred == test$lettr) cat("Proportion Correct:", correct/(n_test), "\n") ``` --- background-image: url(data:image/png;base64,#pics/Mangalore/ParallelSoftware/Slide7mpi.jpg) background-position: top right background-size: 20% ## Hands on Session 5: Random Forest with MPI `code_5/rf_mpi.R` ```r suppressPackageStartupMessages(library(randomForest)) data(LetterRecognition, package = "mlbench") *library(pbdMPI, quiet = TRUE) *comm.set.seed(seed = 7654321, diff = FALSE) n = nrow(LetterRecognition) n_test = floor(0.2 * n) i_test = sample.int(n, n_test) train = LetterRecognition[-i_test, ] *test = LetterRecognition[i_test, ][comm.chunk(n_test, form = "vector"), ] *comm.set.seed(seed = 1234, diff = TRUE) *my.rf = randomForest(lettr ~ ., train, ntree = comm.chunk(500), norm.votes = FALSE) *rf.all = allgather(my.rf) *rf.all = do.call(combine, rf.all) pred = as.vector(predict(rf.all, test)) *correct = allreduce(sum(pred == test$lettr)) comm.cat("Proportion Correct:", correct/(n_test), "\n") *finalize() ``` --- ## Hands on Session 5: `comm.chunk()` `mpi_shorts/chunk.r` ```r library( pbdMPI, quiet = TRUE ) my.rank = comm.rank( ) k = comm.chunk( 10 ) comm.cat( my.rank, ":", k, "\n", all.rank = TRUE, quiet = TRUE) k = comm.chunk( 10 , form = "vector") comm.cat( my.rank, ":", k, "\n", all.rank = TRUE, quiet = TRUE) k = comm.chunk( 10 , form = "vector", type = "equal") comm.cat( my.rank, ":", k, "\n", all.rank = TRUE, quiet = TRUE) finalize( ) ``` --- ## Hands on Session 5: other short MPI codes bcast.r chunk.r comm_split.R cov.r gather-named.r gather.r gather-unequal.r hello-p.r hello.r map-reduce.r mcsim.r ols.r qr-cop.r rank.r reduce-mat.r timer.r * These short codes only use `pbdMPI` and can run on a laptop in a terminal window if you installed OpenMPI * On the clusters these can run on a login node with a small `\(^*\)` number of ranks * Wile in the `mpi_shorts` directory, run the following * `source ../code_4/modules_MACHINE.sh` * `mpirun -np 4 Rscript your_script.r` .footnote[ `\(^*\)` Note that running long or large jobs on login nodes is strongly discouraged ] --- ## Shared Memory - MPI or fork? .w80.pull-left[ * fork via `mclapply()` + `do.call()` combine <img src="data:image/png;base64,#pics/mpi/fork.jpg" width="130" height="150" /> * MPI replicated data + `allreduce()` <img src="data:image/png;base64,#pics/mpi/mpi-replicate.png" width="1832" height="150" /> * MPI chunked data + `allreduce()` <img src="data:image/png;base64,#pics/mpi/mpi-partition.png" width="1820" height="58" /> ] .w20.pull-right[ `do.call()` is serial `allreduce()` is parallel ] --- background-image: url(data:image/png;base64,#pics/Mangalore/ParallelSoftware/Slide7mpi.jpg) background-position: top right background-size: 20% <img src="data:image/png;base64,#pics/01-intro/pbdRlib.jpg" width="300" height="100" style="display: block; margin: auto auto auto 0;" /> # Package `pbdDMAT` * ScaLAPACK: Distributed version of LAPACK (uses PBLAS/BLAS but not LAPACK) * 2d Block-Cyclic data layout - mostly automated in `pbdDMAT` package * BLACS: Communication collectives for distributed matrix computation * PBLAS: Distributed BLAS (uses standard BLAS within blocks) * R code is identical for most matrix operations by overloading operators and `ddmatrix` class --- background-image: url(data:image/png;base64,#pics/Mangalore/ParallelSoftware/Slide7mpi.jpg) background-position: top right background-size: 20% <img src="data:image/png;base64,#pics/01-intro/pbdRlib.jpg" width="300" height="100" style="display: block; margin: auto auto auto 0;" /> # Package `pbdML` * A demonstration of `pbdDMAT` package capabilities * Includes * Randomized SVD * Randomized principal components analysis * Robust Principal Component Analysis?" from https://arxiv.org/pdf/0912.3599.pdf --- ## Hands on Session `\(\quad\)` rsvd: #### Singular value decomposition via randomized sketching <br><br> Randomized sketching produces fast new alternatives to classical numerical linear algebra computations. <br> Guarantees are given with probability statements instead of classical error analysis. <br> <br> Martinsson, P., & Tropp, J. (2020). Randomized numerical linear algebra: Foundations and algorithms. Acta Numerica, 29, 403-572. [https://doi.org/10.48550/arXiv.2002.01387](https://doi.org/10.48550/arXiv.2002.01387) --- background-image: url(data:image/png;base64,#pics/Mangalore/ParallelSoftware/Slide7mpi.jpg) background-position: top right background-size: 20% ## Hands on Session `\(\quad\)` rsvd: #### Randomized SVD via subspace embedding Given an `\(n\times p\)` matrix `\(X\)` and `\(k = r + 10\)`, where `\(r\)` is the *effective rank* of `\(X\)`: 1. Construct a `\(p\times k\)` random matrix `\(\Omega\)` 2. Form `\(Y = X \Omega\)` 3. Decompose `\(Y = QR\)` `\(Q\)` is an orthogonal basis for the columnspace of `\(Y\)`, which with high probability is the columnspace of `\(X\)`. To get the SVD of `\(X\)`: 1. Compute `\(C= Q^TX\)` 2. Decompose `\(C = \hat{U}\Sigma V^T\)` 3. Compute `\(U = Q\hat{U}\)` 4. Truncate factorization to `\(r\)` columns --- background-image: url(data:image/png;base64,#pics/Mangalore/ParallelSoftware/Slide7mpi.jpg) background-position: top right background-size: 20% ## MNIST Data <img src="data:image/png;base64,#pics/mnist/Rplots.png" width="672" height="500" style="display: block; margin: auto auto auto 0;" /> --- `mnist_rsvd.R` ```r source("mnist_read_mpi.R") # reads blocks of rows suppressMessages(library(pbdDMAT)) suppressMessages(library(pbdML)) init.grid() ## construct block-cyclic ddmatrix bldim = c(allreduce(nrow(my_train), op = "max"), ncol(my_train)) gdim = c(allreduce(nrow(my_train), op = "sum"), ncol(my_train)) dmat_train = new("ddmatrix", Data = my_train, dim = gdim, ldim = dim(my_train), bldim = bldim, ICTXT = 2) cyclic_train = as.blockcyclic(dmat_train) comm.print(comm.size()) t1 = as.numeric(Sys.time()) rsvd_train = rsvd(cyclic_train, k = 10, q = 3, retu = FALSE, retvt = TRUE) t2 = as.numeric(Sys.time()) t1 = allreduce(t1, op = "min") t2 = allreduce(t2, op = "max") comm.cat("Time:", t2 - t1, "seconds\n") comm.cat("dim(V):", dim(rsvd_train$vt), "\n") comm.cat("rsvd top 10 singular values:", rsvd_train$d, "\n") finalize() ``` --- ### Hands-on Sessionrsvd ```r rsvd <- function(x, k=1, q=3, retu=TRUE, retvt=TRUE) { n <- ncol(x) if (class(x) == "matrix") Omega <- matrix(runif(n*2L*k), nrow=n, ncol=2L*k) * else if (class(x) == "ddmatrix") * Omega <- ddmatrix("runif", nrow=n, ncol=2L*k, bldim=x@bldim, ICTXT=x@ICTXT) Y <- x %*% Omega Q <- qr.Q(qr(Y)) for (i in 1:q) { Y <- crossprod(x, Q) Q <- qr.Q(qr(Y)) Y <- x %*% Q Q <- qr.Q(qr(Y)) } B <- crossprod(Q, x) if (!retu) nu <- 0 else nu <- min(nrow(B), ncol(B)) if (!retvt) nv <- 0 else nv <- min(nrow(B), ncol(B)) svd.B <- La.svd(x=B, nu=nu, nv=nv) d <- svd.B$d d <- d[1L:k] # Produce u/vt as desired if (retu) { u <- svd.B$u u <- Q %*% u u <- u[, 1L:k, drop=FALSE] } if (retvt) vt <- svd.B$vt[1L:k, , drop=FALSE] # wrangle return if (retu) { if (retvt) svd <- list(d=d, u=u, vt=vt) else svd <- list(d=d, u=u) } else { if (retvt) svd <- list(d=d, vt=vt) else svd <- list(d=d) } return( svd ) } ``` --- background-image: url(data:image/png;base64,#pics/Mangalore/ParallelSoftware/Slide7mpi.jpg) background-position: top right background-size: 20% <img src="data:image/png;base64,#pics/01-intro/pbdRlib.jpg" width="300" height="100" style="display: block; margin: auto auto auto 0;" /> # Package `kazaam` * Distributed methods for tall matrices (and some for wide matrices) that exploit the short dimension for speed and long dimension for parallelism * Tall matrices, `shaq` class, are chunked by blocks of rows * Wide matrices, `tshaq` class, are chunked by blocks of columns * Much like `pbdDMAT`, most matrix operations in R code are identical to serial through overloading operators and `shaq` `S4` class .footnote[ Naming is a "tongue-in-cheek" play on 'Shaquille' 'ONeal' ('Shaq') and the film 'Kazaam' ] --- # Hands on Session `kazaam` --- # Acknowledgments *This research used resources of the Oak Ridge Leadership Computing Facility, which is a DOE Office of Science User Facility supported under Contract DE-AC05-00OR22725.* *This research used resources of the Compute and Data Environment for Science (CADES) at the Oak Ridge National Laboratory, which is supported by the Office of Science of the U.S. Department of Energy under Contract No. DE-AC05-00OR22725"* *Slides are made with the xaringan R package. It is an R Markdown extension based on the JavaScript library remark.js.*